#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <cstring>
#include <algorithm>
#include <fcntl.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <functional>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <ctime>
#include <sys/timerfd.h>
#include <unordered_map>
#include <signal.h>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <sys/eventfd.h>


#define INF 0
#define DBG 1
#define ERR 2
#define DEFAULT_LOG_LEVEL DBG
#define LOG(level, format, ...) {\
            if (level >= DEFAULT_LOG_LEVEL) {\
            time_t t = time(NULL);\
            struct tm *m = localtime(&t);\
            char ts[32] = {0};\
            strftime(ts, 31, "%H:%M:%S", m);\
            fprintf(stdout, "[%p %s %s:%d] " format "\n", (void*)pthread_self(), ts, __FILE__, __LINE__, ##__VA_ARGS__);\
        }\
}
#define ILOG(format, ...) LOG(INF, format, ##__VA_ARGS__);
#define DLOG(format, ...) LOG(DBG, format, ##__VA_ARGS__);
#define ELOG(format, ...) LOG(ERR, format, ##__VA_ARGS__);

#define BUFFER_DEFAULE_SIZE 1024
class Buffer
{
private:
    std::vector<char> _buffer; // 缓冲区
    uint64_t _read_idx;        // 读偏移
    uint64_t _write_idx;       // 写偏移
public:
    Buffer() : _buffer(BUFFER_DEFAULE_SIZE), _read_idx(0), _write_idx(0) {}
    // 返回数组起始地址
    char *Begin() { return &*_buffer.begin(); }
    // 获取当前读位置
    char *ReadPosition() { return Begin() + _read_idx; }
    // 获得当前写位置
    char *WritePosition() { return Begin() + _write_idx; }
    // 获得末尾空闲空间 -- 总空间-写偏移
    uint64_t TailIdleSpace() { return _buffer.size() - _write_idx; }
    // 获得头部空闲空间 -- 读偏移
    uint64_t HeadIdleSpace() { return _read_idx; }
    // 获取可读数据大小 -- 写偏移-读偏移
    uint64_t ReadAbleSize() { return _write_idx - _read_idx; }
    // 将读偏移向后移动
    void MoveReadOffSet(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        _read_idx += len;
    }
    // 将写偏移向后移动
    void MoveWriteOffSet(uint64_t len)
    {
        assert(len <= TailIdleSpace());
        _write_idx += len;
    }
    // 确保可写空间足够（头部和尾部的空间够则移动数据，不够则扩容）
    void EnsureWriteSpace(uint64_t len)
    {
        if (len <= TailIdleSpace())
            return;
        else if (len <= TailIdleSpace() + HeadIdleSpace())
        {
            // 将数据拷贝到最前面
            int sz = ReadAbleSize();
            std::copy(ReadPosition(), ReadPosition() + sz, Begin());
            // 设置读写偏移
            _read_idx = 0;
            _write_idx = sz;
        }
        else
        {
            // 扩容
            _buffer.resize(_write_idx + len);
        }
    }
    // 写入数据
    void Write(const void *data, uint64_t len)
    {
        // 1.保证写空间足够 2.拷贝
        if (len == 0)
            return;
        EnsureWriteSpace(len);
        const char *d = (const char *)data;
        std::copy(d, d + len, WritePosition());
    }
    void WriteAndPush(const void *data, uint64_t len) {
        Write(data, len);
        MoveWriteOffSet(len);
    }
    // 通过string写入
    void WriteString(const std::string &str, uint64_t len)
    {
        Write(str.c_str(), len);
    }
    void WriteStringAndPush(const std::string &str, uint64_t len)
    {
        WriteString(str, len);
        MoveWriteOffSet(len);
    }
    // 通过buffer写入
    void WriteBuffer(Buffer &buf)
    {
        Write(buf.ReadPosition(), buf.ReadAbleSize());
    }
    void WriteBufferAndPush(Buffer &buf)
    {
        WriteBuffer(buf);
        MoveWriteOffSet(buf.ReadAbleSize());
    }
    // 读取数据
    void Read(void *buf, uint64_t len)
    {
        // 保证读取空间要够
        assert(ReadAbleSize() >= len);
        std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);
    }
    // 通过string读取数据
    std::string ReadString(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::string str;
        str.resize(len);
        Read(&str[0], len);
        return str;
    }
    std::string ReadStringAndPop(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::string str = ReadString(len);
        MoveReadOffSet(len);
        return str;
    }
    // 寻找换行符
    char *FindCRLF()
    {
        void *res = memchr(ReadPosition(), '\n', ReadAbleSize());
        return (char *)res;
    }
    // 获取一行
    std::string GetLine()
    {
        char *res = FindCRLF();
        if (res == nullptr)
            return "";
        //+1的目的是为了把换行符也取出来
        return ReadString(res - ReadPosition() + 1);
    }
    std::string GetLineAndPop()
    {
        std::string ret = GetLine();
        MoveReadOffSet(ret.size());
        return ret;
    }
    // 清空缓冲区 -- 把读写偏移放到0即可
    void Clear()
    {
        _read_idx = 0;
        _write_idx = 0;
    }
};

#define MAX_LISTEN 1024

class Socket
{
private:
    int _sockfd; // 套接字
public:
    Socket() : _sockfd(-1) {}
    Socket(int sock) : _sockfd(sock) {}
    ~Socket() { Close(); }
    int Fd() { return _sockfd; }
    // 创建套接字
    bool Create()
    {
        // int socket(int domain, int type, int protocol);
        _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (_sockfd < 0)
        {
            ELOG("CREATE SOCK FAILED!");
            return false;
        }
        return true;
    }
    // 绑定地址
    bool Bind(const std::string &ip, uint16_t port)
    {
        // int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        int ret = bind(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ELOG("BIND FAILED! %s",strerror(errno));
            return false;
        }
        return true;
    }
    // 监听套接字
    bool Listen(int backlog = MAX_LISTEN)
    {
        // int listen(int sockfd, int backlog);
        int ret = listen(_sockfd, backlog);
        if (ret < 0)
        {
            ELOG("LISTEN FAILED!");
            return false;
        }
        return true;
    }
    // 连接
    bool Connect(const std::string &ip, uint16_t port)
    {
        // int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        socklen_t len = sizeof(struct sockaddr_in);
        int ret = connect(_sockfd, (struct sockaddr *)&addr, len);
        if (ret < 0)
        {
            ELOG("Connect FAILED!");
            return false;
        }
        return true;
    }
    // 获取新连接
    int Accept()
    {
        // int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
        // 因为我们不需要知道远端的ip和端口，所以后两个设置为空即可
        int newsock = accept(_sockfd, nullptr, nullptr);
        if (newsock < 0)
        {
            ELOG("ACCEPT FAILED!");
            return -1;
        }
        return newsock;
    }
    // 接收数据
    ssize_t Recv(void *buf, size_t len, int flag = 0)
    {
        // ssize_t recv(int sockfd, void *buf, size_t len, int flags);
        ssize_t ret = recv(_sockfd, buf, len, flag);
        if (ret <= 0)
        {
            // 这里有可能是非阻塞或者是信号导致的
            if (ret == EAGAIN || ret == EINTR)
                return 0;
            ELOG("RECV FAILED!");
            return -1;
        }
        return ret; // 返回实际接收数据大小
    }
    // 设置非阻塞接收数据
    ssize_t NonBlockRecv(void *buf, size_t len)
    {
        return Recv(buf, len, MSG_DONTWAIT);
    }
    // 发送数据
    ssize_t Send(const void *buf, size_t len, int flag = 0)
    {
        // ssize_t send(int sockfd, const void *buf, size_t len, int flags);
        const char* str = (const char*)buf;
        ssize_t ret = send(_sockfd, buf, len, flag);
        if (ret <= 0)
        {
            if (ret == EAGAIN || ret == EINTR)
                return 0;
            ELOG("Send FAILED!");
            return -1;
        }
        return ret; // 返回实际发送数据大小
    }
    // 设置非阻塞发送数据
    ssize_t NonBlockSend(void *buf, size_t len)
    {
        return Send(buf, len, MSG_DONTWAIT);
    }
    // 关闭套接字
    void Close()
    {
        if (_sockfd != -1)
        {
            close(_sockfd);
            _sockfd = -1;
        }
    }
    // 建立服务端连接
    bool CreateSerber(uint16_t port, const std::string &ip = "0.0.0.0", bool flag = false)
    {
        // 1.创建套接字 2.绑定地址 3.监听 4.设置地址重用 5.设置非阻塞
        if (Create() == false)
            return false;
        // 这里可以提供用户选择，非阻塞或者阻塞
        if (flag)
            SetNonBlock();
        if (Bind(ip, port) == false)
            return false;
        if (Listen() == false)
            return false;
        ReuseAddr();
        return true;
    }
    // 创建客户端连接
    bool CreateClient(uint16_t port, const std::string &ip)
    {
        // 1.创建套接字 2.连接
        if (Create() == false)
            return false;
        if (Connect(ip, port) == false)
            return false;
        return true;
    }
    // 设置非阻塞
    void SetNonBlock()
    {
        // int fcntl(int fd, int cmd, ... /* arg */ );
        int flag = fcntl(_sockfd, F_GETFL, 0);
        fcntl(_sockfd,F_SETFL,flag | O_NONBLOCK);
    }
    // 开启地址重用
    void ReuseAddr()
    {
        // int setsockopt(int sockfd, int level, int optname,const void *optval, socklen_t optlen);
        int opt = 1;
        //设置地址重用
        int ret = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(int));
        if(ret < 0)
        {
            DLOG("SET REUSERADDR FAILED! %s",strerror(errno));
        }
        int val = 1;
        //设置端口重用
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));
    }
};

class Poller;
class EventLoop;

class Channel
{
private:
    int _fd; //监控的描述符
    EventLoop* _loop; 
    uint32_t _event; //监控事件
    uint32_t _revent; //已经发生的事件
    //各种回调函数
    using EventCallback = std::function<void()>;
    EventCallback _read_callback;   //读事件回调
    EventCallback _write_callback; //写事件回调
    EventCallback _error_callback; //错误事件回调
    EventCallback _close_callback; //关闭事件回调
    EventCallback _event_callback; //任意事件回调
public:
    Channel(EventLoop* loop,int fd):_fd(fd),_event(0),_revent(0),_loop(loop){}
    int Fd() { return _fd; }
    uint32_t Events() { return _event; }
    void SetRevent(uint32_t revent) { _revent = revent; }
    //设置各个回调函数
    void SetReadCallback(const EventCallback& cb) { _read_callback = cb; }
    void SetWriteCallback(const EventCallback& cb) { _write_callback = cb; }
    void SetErrorCallback(const EventCallback& cb) { _error_callback = cb; }
    void SetCloseCallback(const EventCallback& cb) { _close_callback = cb; }
    void SetEventCallback(const EventCallback& cb) { _event_callback = cb; }
    //当前是否监控了可读
    bool ReadAble() { return _event & EPOLLIN;}
    //当前是否监控了可写
    bool WriteAble() { return _event & EPOLLOUT;}
    //启动可读事件监控
    void SetRead() { _event |= EPOLLIN; Update();}
    //启动可写事件监控
    void SetWrite() { _event |= EPOLLOUT; Update();}
    //关闭可读事件监控
    void CloseRead() { _event &= ~EPOLLIN ; Update();}
    //关闭可写事件监控
    void CloseWrite() { _event &= ~EPOLLOUT; Update();}
    //关闭全部事件监控
    void CloseEvent() { _event = 0; Update();}
    //移除监控
    void Remove();
    void Update();
    //处理任意事件
    void HandleEvent()
    {
        //这里因为是把销毁任务压入到任务池中执行，所以，这里可以直接执行任务而不需要先刷新活跃度
        //满足条件，都会触发的
        if((_revent & EPOLLIN) || (_revent & EPOLLRDHUP) || (_revent & EPOLLPRI))
        {
            if(_read_callback) _read_callback();
        }
        //有可能释放连接的操作，一次只能处理一个
        if(_revent & EPOLLOUT)
        {
            if(_write_callback) _write_callback();
        }
        else if(_revent & EPOLLERR)
        {
            if(_error_callback) _error_callback();
        }
        else if(_revent & EPOLLHUP)
        {
            if(_close_callback) _close_callback();
        }
        //所有事件处理过都需要刷新活跃度
        if(_event_callback) _event_callback();
    }
};


#define MAX_EVENTSIZE 1024

class Poller
{
private:
    int _epfd; 
    struct epoll_event _evs[MAX_EVENTSIZE];
    std::unordered_map<int,Channel*> _channels; //描述符和channel的映射关系
private:
    //对epoll的操作
    void Update(Channel* channel,int op){
        // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Events();
        int ret = epoll_ctl(_epfd,op,fd,&ev);
        if(ret < 0)
        {
            ELOG("_epfd:%d op:%d events:%d ",_epfd,op,ev.events);
            ELOG("EPOLL_CTL FAILED! %s",strerror(errno));
        }
    }
    //判断一个事件是否被监控
    bool HasChannel(Channel* channel)
    {
        auto it = _channels.find(channel->Fd());
        if(it != _channels.end())
        {
            return true;
        }
        return false;
    }
public:
    Poller()
    {
        //创建一个epoll模型
        _epfd = epoll_create(MAX_EVENTSIZE);
        if(_epfd < 0)
        {
            ELOG("EPOLL_CREATE FAILED:%s",strerror(errno));
            abort(); //退出程序
        }
    }
    //添加或者修改监控事件
    void UpdateEvent(Channel* channel)
    {
        //在channels中找不到就添加，找到就修改
        if(HasChannel(channel))
        {
            return Update(channel,EPOLL_CTL_MOD);
        }
        _channels.insert(std::make_pair(channel->Fd(),channel));
        return Update(channel,EPOLL_CTL_ADD);
    }
    //移除监控事件
    void RemoveEvent(Channel* channel)
    {
        auto it = _channels.find(channel->Fd());
        if(it != _channels.end()) _channels.erase(it); 
        //移除监控事件
        Update(channel,EPOLL_CTL_DEL);
    }
    //开始监控，返回活跃连接
    void Poll(std::vector<Channel*>* active)
    {
        //int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
        int nfds = epoll_wait(_epfd,_evs,MAX_EVENTSIZE,-1);
        if(nfds < 0)
        {
            //这里有可能是信号打断
            if(errno == EINTR)
            {
                return;
            }
            //其他原因
            ELOG("EPOLL_WAIT FAILED! %s",strerror(errno));
            abort(); //退出程序
        }
        for(int i = 0;i<nfds;++i)
        {
            auto it = _channels.find(_evs[i].data.fd);
            assert(it != _channels.end());
            //向channel设置就绪事件
            it->second->SetRevent(_evs[i].events);
            //向外输出活跃事件的channel
            active->push_back(it->second);
        }
    }
};

using OnCloseTime = std::function<void()>; //定时器要执行的任务
using ReleaseTime = std::function<void()>; //删除时间管理对象中weak_ptr的信息

//定时器任务对象
class TimeTask
{
private:
    uint32_t _timeout; //超时时间
    uint64_t _id; //每个任务的id
    bool _cancel; //取消定时任务
    OnCloseTime _close_cb; //销毁定时任务的回调
    ReleaseTime _release_cb; //因为时间轮中会记录一个weak_ptr对象，所以最后需要销毁
public:
    TimeTask(uint32_t timeout,uint64_t id,const OnCloseTime& close_cb):_timeout(timeout),_id(id),_close_cb(close_cb),_cancel(false){}
    ~TimeTask(){ 
        if(_cancel== false) _close_cb();
        _release_cb();
    }
    uint64_t id() { return _id; }
    void SetRelease(const ReleaseTime& release_cb) { _release_cb = release_cb; }
    uint32_t Delay() { return _timeout; }
    void Cancel() { _cancel = true; }
};

//时间轮管理对象
class TimeWheel
{
private:
    using WeakTask = std::weak_ptr<TimeTask>; //使用weak_ptr防止在shared_ptr直接对对象的操作
    using PtrTask = std::shared_ptr<TimeTask>; //使用shared_ptr保证释放时不到0不销毁
    int _capacity; //记录时间轮的大小
    int _tick; //记录当前指针指向的时间，指到哪里，销毁哪里
    std::vector<std::vector<PtrTask>> _wheel; //时间轮
    std::unordered_map<uint64_t,WeakTask> _times; //记录id和weak_ptr之间的映射关系
    int _timerfd; //定时器描述符
    EventLoop* _loop;
    std::unique_ptr<Channel> _timer_channel; //用于定时器事件管理
private:
    void RemoveTask(uint64_t id)
    {
        auto it = _times.find(id);
        if(it != _times.end())
        {
            _times.erase(it);
        }
    }
    static int CreateTimerFd()
    {
        //int timerfd_create(int clockid, int flags);
        int timerfd = timerfd_create(CLOCK_MONOTONIC,0);
        if(timerfd < 0)
        {
            ELOG("timerfd_create fail");
            abort();
        }
        //int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,struct itimerspec *old_value);
        //设置结构体
        struct itimerspec itim;
        itim.it_value.tv_sec = 1;
        itim.it_value.tv_nsec = 0; //设置第一次超时时间
        itim.it_interval.tv_sec = 1;
        itim.it_interval.tv_nsec = 0; //第一次超时之后每隔1秒超时一次
        timerfd_settime(timerfd,0,&itim,nullptr);
        return timerfd;
    }
    int ReadTimerFd()
    {
        uint64_t times;
        int ret = read(_timerfd,&times,8);
        if(ret < 0){
            ELOG("READTIMERFD FAILED!");
            abort();
        }
        return times;
    }
    void RunTask()
    {
        _tick = (_tick+1)%_capacity;
        _wheel[_tick].clear();
    }
    void Ontime()
    {
        //读取timerfd中内容，根据实时的超时次数执行任务,这里防止服务器因为处理繁忙而导致这里只进行了一次的的刷新，必须要刷新够次数
        int times = ReadTimerFd();
        for(int i = 0;i<times;++i)
        {
            RunTask();
        }
    }
    void AddTaskInLoop(uint64_t id,uint32_t delay,const OnCloseTime close_cb)
    {
        PtrTask pt(new TimeTask(delay,id,close_cb));
        //设置ReleaseTask
        pt->SetRelease(std::bind(&TimeWheel::RemoveTask,this,id));
        //把任务添加到数组中
        int pos = (_tick + delay) %_capacity;
        _wheel[pos].push_back(pt);
        //将id和weakTask映射关联起来
        _times[id] = WeakTask(pt);
    }
    void CancelTaskInLoop(uint64_t id)
    {
        //通过id找到任务，如果没有直接返回，有的话将标志置为true
        auto it = _times.find(id);
        if(it == _times.end()) return;
        PtrTask pt = it->second.lock(); //获得weak_ptr中的shared_ptr
        if(pt) pt->Cancel();
    }
    void RefreshTaskInLoop(uint64_t id)
    {
        //创建一个新的智能指针对象，然后添加到数组中
        //如果在原数组中没有找到，那么直接返回
        auto it = _times.find(id);
        if(it == _times.end()) return;
        PtrTask pt = it->second.lock(); //获得weak_ptr中的shared_ptr
        int delay = pt->Delay();
        int pos = (_tick + delay) %_capacity;
        _wheel[pos].push_back(pt);
    }
public:
    TimeWheel(EventLoop* loop):_tick(0),_capacity(60),_wheel(_capacity),_timerfd(CreateTimerFd())
    ,_loop(loop),_timer_channel(new Channel(_loop,_timerfd))
    {
        //设置读回调，并启动读监控
        _timer_channel->SetReadCallback(std::bind(&TimeWheel::Ontime,this));
        _timer_channel->SetRead();
    }
    //因为当前类中有使用到数据结构，为了保证线程安全而又不用加锁的方式来提高效率，那么我们让其在一个线程中执行
    void AddTask(uint64_t id,uint32_t delay,const OnCloseTime close_cb);
    void CancelTask(uint64_t id);
    void RefreshTask(uint64_t id);
    //这个接口存在线程安全问题，只能在EventLoop模块中使用
    bool HasTimer(uint64_t id)
    {
        auto it = _times.find(id);
        if(it == _times.end())
        {
            return false;
        }
        return true;
    }
};

class EventLoop
{
private:
    using Func = std::function<void()>; 
    std::thread::id _thread_id; //线程ID
    Poller _poller;
    int _event_fd; 
    std::unique_ptr<Channel> _event_channel; //通过channel来管理eventfd
    std::vector<Func> _tasks; //任务队列
    std::mutex _mutex; //保证任务队列的线程安全
    TimeWheel _time_wheel; //时间轮
private:
    static int CreateEventfd()
    {
        int efd = eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);
        if(efd < 0)
        {
            ELOG("eventfd failed!");
            abort(); //退出程序
        }
        return efd;
    }
    void ReadEventFd()
    {
        uint64_t res = 0;
        int ret = read(_event_fd,&res,sizeof(res));
        if(ret < 0)
        {
            //信号打断或者读阻塞
            if(errno == EINTR || errno == EAGAIN)
            {
                return;
            }
            ELOG("READEVENTFD FAILED!");
            abort();
        }
    }
    void WeakUpEventFd()
    {
        uint64_t val = 1;
        int ret = write(_event_fd,&val,sizeof(val));
        if(ret < 0)
        {
            if(errno == EINTR)
            {
                return;
            }
            ELOG("WEAKUPEVENTFD FAILED!");
            abort();
        }
    }
     //执行任务队列中的所有任务
    void RunAllTask() 
    {
        std::vector<Func> functor;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _tasks.swap(functor);
        }
        //执行任务
        for(auto& f:functor) f();
    }
public:
    EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventfd()),_event_channel(new Channel(this,_event_fd))
    ,_time_wheel(this)
    {
        //给eventChannel设置回调函数
        _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd,this));
        //启动读事件监控
        _event_channel->SetRead();
    }
    //如果要执行的任务在当前线程那就直接执行，不在就压入任务队列
    void RunInLoop(const Func& cb)
    {
        if(IsInloop()) cb();
        else QueueInLoop(cb);
    }
    //断言一个线程是否在当前线程中
    void AssertInLoop()
    {
        assert(_thread_id == std::this_thread::get_id());
    }
    //压入任务队列
    void QueueInLoop(const Func& cb)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _tasks.push_back(cb);
        }
        //唤醒有可能因为事件还没有就绪的阻塞线程
        WeakUpEventFd();
    }
    //判断当前线程是否是EventLoop线程
    bool IsInloop() { return _thread_id == std::this_thread::get_id();}
    //添加/更新监控事件
    void UpdateEvent(Channel* channel) { return _poller.UpdateEvent(channel);}
    //移除监控事件
    void RemoveEvent(Channel* channel) { return _poller.RemoveEvent(channel);}
    //事件监控 ->就绪事件处理 -> 执行任务
    void Start() 
    {
        while(1)
        {
            std::vector<Channel*> actives;
            _poller.Poll(&actives);
            //就绪事件处理
            for(auto& a:actives)
            {
                a->HandleEvent();
            }
            //执行任务
            RunAllTask();
        }
    }
    void TimerAdd(uint64_t id,uint32_t delay,const OnCloseTime close_cb) { _time_wheel.AddTask(id,delay,close_cb); }
    void TimerRefresh(uint64_t id) { _time_wheel.RefreshTask(id); }
    void TimerCancel(uint64_t id) { _time_wheel.CancelTask(id); }
    bool HasTimer(uint64_t id) { return _time_wheel.HasTimer(id); }
};

class LoopThread
{
private:
    //实现获取loop和构造函数的同步关系，保证先实例化了loop之后才能获取
    std::mutex _mutex;
    std::condition_variable _cond;
    EventLoop* _loop; 
    std::thread _thread; //一个线程对应一个loop
private:
    void ThreadEntry()
    {
        //1.创建loop  2.通过条件变量来唤醒等待线程  3.运行
        EventLoop loop; //这里的临时变量生命周期跟随LoopThread
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _cond.notify_all();
            _loop = &loop;
        }
        _loop->Start();
    }
public:
    LoopThread():_loop(nullptr),_thread(std::bind(&LoopThread::ThreadEntry,this)){}
    EventLoop* GetLoop()
    {
        EventLoop* loop;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            //通过条件变量保证同步
            _cond.wait(lock,[&](){ return _loop != nullptr; });
            loop = _loop;
        }
        return loop;
    }
};

class LoopThreadPool
{
private:
    int _thread_count; //创建的LoopThread数量
    int _next_idx; //采用RR轮转的方式进行分配
    EventLoop* _base_loop; //主EventLoop跟随主线程
    std::vector<LoopThread*> _threads;  //管理全部的线程
    std::vector<EventLoop*> _loops;   //管理从属EventLoop
public:
    LoopThreadPool(EventLoop* loop):_base_loop(loop),_thread_count(0),_next_idx(0){}
    void SetThreadCount(int count) { _thread_count = count; }
    //根据数量来创建出对应的LoopThread
    void Create()
    {
        if(_thread_count>0)
        {
            _threads.resize(_thread_count);
            _loops.resize(_thread_count);
            for(int i = 0;i<_thread_count;++i)
            {
                _threads[i] = new LoopThread();
                _loops[i] = _threads[i]->GetLoop();
            }
        }
    }
    EventLoop* NextLoop()
    {
        if(_thread_count == 0) return _base_loop;
        _next_idx = (_next_idx + 1)% _thread_count;
        return _loops[_next_idx];
    }
};

class Any
{
private:
    //父类不是模板类，这样可以保证Any类不是模板
    class holer
    {
    public:
        virtual ~holer() {}
        virtual const std::type_info& type() = 0; //设置成纯虚函数，那么子类想要实例化就必须要重写虚函数 
        virtual holer* clone() = 0;
    };
    template <class T>
    class placeholer : public holer
    {
    public:
        placeholer(const T& val):_val(val){}
        virtual ~placeholer() {}
        virtual const std::type_info& type() { return typeid(T);}
        virtual placeholer* clone() { return new placeholer(_val);}
    public:
        T _val;
    };
private:
    Any& swap(Any& any)
    {
        std::swap(_holer,any._holer);
        return *this;
    }
    holer* _holer;
public:
    Any():_holer(nullptr) {}
    ~Any(){ delete _holer; }
    Any(const Any& any):_holer(any._holer == nullptr?nullptr:any._holer->clone()){}
    template <class T>
    Any(const T& val):_holer(new placeholer<T>(val)) {}
    template<class T>
    T* get()
    {
        assert(typeid(T) == _holer->type());
        return &((placeholer<T>*)_holer)->_val;
    }
    Any& operator=(Any& any)
    {
        Any(any).swap(*this);
        return *this;
    }
    template<class T>
    Any& operator=(const T& val)
    {
        Any(val).swap(*this);
        return *this;
    }
};

class Connection;

//DISCONNECTED  断开连接状态
//CONNECTING  连接建立，但是未完成全部工作的过渡态
//CONNECTED  连接建立完成，可以通信状态
//DISCONNECTING 连接待关闭状态，等待处理后序工作之后断开连接
typedef enum {DISCONNECTED,CONNECTING,CONNECTED,DISCONNECTING }ConnectStatus;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:
    int _conn_id; //连接建立的唯一id
    //uint64_t _timer_id  这里为了简化使用，直接使用connid来作为timerid
    int _socketfd; //连接对应的文件描述符
    bool _enable_active_release; //是否启动非活跃连接超时销毁
    EventLoop* _loop; //连接所关联的线程
    Socket _socket; //套接字管理
    Channel _channel; //事件管理
    ConnectStatus _status; //连接状态
    Buffer _in_buffer; //输入缓冲区，从套接字中读取，然后放入到缓冲区中
    Buffer _out_buffer; //输出缓冲区，将待发送数据放到输出缓冲区
    Any _context; //连接上下文
private:
    //事件处理回调函数
    using ConnectCallback = std::function<void(const PtrConnection&)>;
    using MessageCallback = std::function<void(const PtrConnection&,Buffer*)>;
    using ClosedCallback = std::function<void(const PtrConnection&)>;
    using AnyEventCallback = std::function<void(const PtrConnection&)>;
    ConnectCallback _connect_callback;
    MessageCallback _msg_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;
    //组件内关闭回调函数，组件内使用，因为使用智能指针进行Connect的管理，一旦关闭，就应该在管理的地方进行删除
    ClosedCallback _server_event_callback;
    //为了保证线程安全，每一个接口函数都应该放入到一个线程中
    void SendInLoop(Buffer& buf)
    {
        //这里并不是实际的发送接口，而是把数据放到发送缓冲区中
        if(_status == DISCONNECTED) return; 
        _out_buffer.WriteBufferAndPush(buf);
        //启动写事件监控
        if(_channel.WriteAble() == false) _channel.SetWrite();

    }
    void ShutDownInLoop()
    {
        //这里也不是实际的关闭操作，而是将待处理的数据处理，待发送的数据发送
        _status = DISCONNECTING; //处于待关闭状态
        if(_in_buffer.ReadAbleSize() > 0){
            if(_msg_callback) _msg_callback(shared_from_this(),&_in_buffer);
        }
        if(_out_buffer.ReadAbleSize() > 0)
        {
            //启动写事件监控
            if(_channel.WriteAble() == false) _channel.SetWrite();
        }
        //关闭连接，不管数据有没有处理完，因为这里的数据可能不完整，不需要处理了
        if(_out_buffer.ReadAbleSize() == 0) Release();
    }
    void ReleaseInLoop()
    {
        //1.修改连接状态
        _status = DISCONNECTED;
        //2.移除事件监控
        _channel.Remove();
        //3.关闭描述符
        _socket.Close();
        //4.取消定时销毁任务
        if(_loop->HasTimer(_conn_id)) CancelActiveReleaseInLoop();
        //5.调用关闭的回调函数
        if(_closed_callback) _closed_callback(shared_from_this());
        //组件内调用的关闭函数
        if(_server_event_callback) _server_event_callback(shared_from_this());
    }
    void EstablishedInLoop()
    {
        //1.修改状态
        assert(_status == CONNECTING);
        _status = CONNECTED;
        //2.启动读事件监控
        _channel.SetRead();
        //3.调用连接成功回调函数
        if(_connect_callback) _connect_callback(shared_from_this());
    }
    void SetActiveReleaseInloop(int sec)
    {
        //1.修改判断标志位
        _enable_active_release = true;
        //2.如果定时任务存在，那就延迟一下
        if(_loop->HasTimer(_conn_id)){
            return _loop->TimerRefresh(_conn_id);
        }
        //3.不存在就添加
        _loop->TimerAdd(_conn_id,sec,std::bind(&Connection::Release,this));
    }
    void CancelActiveReleaseInLoop()
    {
        _enable_active_release = false;
        if(_loop->HasTimer(_conn_id)) _loop->TimerCancel(_conn_id);
    }
    void UpgradeInLoop(const Any& context,const ConnectCallback& conn,const MessageCallback& msg, 
                const ClosedCallback& closed,const AnyEventCallback& event)
                {
                    //修改各个类成员变量即可
                    _context = context;
                    _connect_callback = conn;
                    _msg_callback = msg;
                    _closed_callback = closed;
                    _event_callback = event;
                }
    //五个Channel事件回调函数
    //将socket数据放到接收缓冲区中，调用message_callback进行消息的读取
    void HandlerRead()
    {
        //1.把数据放入到接收缓冲区
        char buffer[65536];
        ssize_t ret = _socket.NonBlockRecv(buffer,65535);
        if(ret < 0)
        {
            //出错了，不能直接关闭，而是调用ShutDownInLoop()
            return ShutDownInLoop();
        }
        //如果接收到的数据是0就不需要进行消息处理
        //2.调用message_callback
        _in_buffer.WriteAndPush(buffer,ret);
        if(_in_buffer.ReadAbleSize() > 0)
        {
            _msg_callback(shared_from_this(),&_in_buffer);
        }
    }
    //将发送缓冲区的数据发送
    void HandlerWrite()
    {
        //将发送缓冲区中的数据发送出去
        ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(),_out_buffer.ReadAbleSize());
        if(ret < 0)
        {
            //出错了就看接收缓冲区有没有实际要处理的数据，有的话处理完就实际关闭连接
            if(_in_buffer.ReadAbleSize() > 0)
            {
                _msg_callback(shared_from_this(),&_in_buffer);
            }
            return Release();
        }
        //记得把写偏移移动
        _out_buffer.MoveReadOffSet(ret);
        if(_out_buffer.ReadAbleSize() == 0)
        {
            _channel.CloseWrite(); //关闭写事件监控
            //如果是连接待关闭状态则关闭连接
            if(_status == DISCONNECTING)
            {
                return Release();
            }
        }
    }
    //描述符挂断事件处理
    void HandlerClose()
    {
        if(_in_buffer.ReadAbleSize() > 0)
        {
            _msg_callback(shared_from_this(),&_in_buffer);
        }
        return Release();
    }
    //描述符错误事件处理
    void HandlerError() { HandlerClose(); }
    //描述符触发任意事件
    void HandlerEvent() { 
        //1.判断是否需要刷新活跃度
        if(_enable_active_release == true) { _loop->TimerRefresh(_conn_id);}
        //2.调用组件使用者的任意事件回调
        if(_event_callback) { return _event_callback(shared_from_this()); }
    }   

public:
    Connection(int connid,int socketfd,EventLoop* loop)
    :_conn_id(connid),_socketfd(socketfd),_enable_active_release(false),_loop(loop),
    _socket(socketfd),_channel(_loop,_socketfd),_status(CONNECTING)
    {
        //设置channel回调函数
        _channel.SetReadCallback(std::bind(&Connection::HandlerRead,this));
        _channel.SetWriteCallback(std::bind(&Connection::HandlerWrite,this));
        _channel.SetErrorCallback(std::bind(&Connection::HandlerError,this));
        _channel.SetCloseCallback(std::bind(&Connection::HandlerClose,this));
        _channel.SetEventCallback(std::bind(&Connection::HandlerEvent,this));
    }
    ~Connection() { DLOG("CONNECTION RELEASE :%p",this); }
    //成员变量的接口 
    int Fd() { return _socketfd; }
    //获取连接Id
    int Id() { return _conn_id; }
    //设置上下文
    void SetContext(const Any& context) { _context = context; }
    //获取上下文信息
    Any* Context() { return &_context; }
    //判断当前是否是连接状态
    bool IsConnected() { return _status == CONNECTED; }
    //设置各种回调函数
    void SetConnectCallback(const ConnectCallback& cb) { _connect_callback = cb; }
    void SetMessageCallback(const MessageCallback& cb) { _msg_callback = cb; }
    void SetClosedCallback(const ClosedCallback& cb) { _closed_callback = cb; }
    void SetEventCallback(const AnyEventCallback& cb) { _event_callback = cb; }
    void SetSvrCallback(const ClosedCallback& cb) { _server_event_callback = cb; }
    //发送数据,将数据放到发送缓冲区，启动写事件监控
    void Send(const char* data,size_t len)
    {
        //这里外面传过来的是一个临时对象，有可能会销毁，所以保存一份变量保证安全性
        Buffer buf;
        buf.WriteAndPush(data,len);
        _loop->RunInLoop(std::bind(&Connection::SendInLoop,this,std::move(buf)));
    }
    //这是提供给组件使用的关闭操作，但是并不是实际的关闭，需要内部进行处理
    void ShutDown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
    }
    //实际关闭连接的接口，销毁时应该放入任务池中，在执行完任务之后才销毁，否则可能会导致其他任务处理时，连接被释放导致内存访问错误
    void Release()
    {
        _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop,this));
    }
    //连接建立成功之后设置channel，启动读事件监控，调用_connect_callback
    void Established()
    {
        _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));
    }
    //启动非活跃连接销毁
    void SetActiveRelease(int sec)
    {
        _loop->RunInLoop(std::bind(&Connection::SetActiveReleaseInloop,this,sec));
    }
    //取消非活跃连接销毁
    void CancelActiveRelease()
    {
        _loop->RunInLoop(std::bind(&Connection::CancelActiveReleaseInLoop,this));

    }
    //协议切换
    void Upgrade(const Any& context,const ConnectCallback& conn,const MessageCallback& msg, 
                const ClosedCallback& closed,const AnyEventCallback& event)
                {
                    //因为协议切换是需要放在线程中，并且应该立即执行，否则用户切换协议之前的数据处理就没有意义了
                    _loop->AssertInLoop();
                    _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,conn,msg,closed,event));
                }
};


class Acceptor
{
private:
    EventLoop* _loop;
    Socket _socket;
    Channel _channel;
    using Accept_callback = std::function<void(int)>;
    Accept_callback _accept_cb;
private:
    int CreateSocket(int port)
    {
        bool ret = _socket.CreateSerber(port);
        assert(ret);
        return _socket.Fd();
    }
    void HandlerRead()
    {
        int newfd = _socket.Accept();
        if(newfd < 0) return;
        if(_accept_cb) _accept_cb(newfd);
    }
public:
    //构造函数不能立刻启用可读事件监控，否则这里有可能导致回调函数还没有设置，此时如果立刻有连接到来，会导致newfd没有得到处理，最终资源泄露
    Acceptor(EventLoop* loop,int port):_loop(loop),_socket(CreateSocket(port)),_channel(_loop,_socket.Fd())
    {
        _channel.SetReadCallback(std::bind(&Acceptor::HandlerRead,this));
    }
    void SetAcceptCallback(const Accept_callback& cb) { _accept_cb = cb; }
    //开始监听,启动读事件监控
    void Listen()
    {
        _channel.SetRead();
    }
};

class TcpServer
{
private:
    uint64_t _next_id; 
    int _timeout;  //销毁时间
    bool _enable_active_release; //是否其实非活跃超时销毁
    EventLoop _base_loop;  //主线程
    Acceptor _acceptor;  //监听套接字管理的对象
    LoopThreadPool _pool;  //从属线程
    std::unordered_map<uint64_t,PtrConnection> _conns; //管理连接

    using ConnectCallback = std::function<void(const PtrConnection&)>;
    using MessageCallback = std::function<void(const PtrConnection&,Buffer*)>;
    using ClosedCallback = std::function<void(const PtrConnection&)>;
    using AnyEventCallback = std::function<void(const PtrConnection&)>;
    using Functor = std::function<void()>;
    ConnectCallback _connect_callback;
    MessageCallback _msg_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;
private:
    //新连接构造一个connection管理
    void NewConnection(int newfd)
    {
        ++_next_id;
        PtrConnection conn(new Connection(_next_id,newfd,_pool.NextLoop()));
        conn->SetConnectCallback(_connect_callback);
        conn->SetMessageCallback(_msg_callback);
        conn->SetClosedCallback(_closed_callback);
        conn->SetEventCallback(_event_callback);
        conn->SetSvrCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));
        if(_enable_active_release) conn->SetActiveRelease(_timeout);
        conn->Established();
        _conns.insert(std::make_pair(_next_id,conn));
    }
    //关闭时调用，去除管理Connection
    void RemoveConnection(const PtrConnection& conn)
    {
        _base_loop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn));
    }
    void RemoveConnectionInLoop(const PtrConnection& conn)
    {
        int id = conn->Id();
        _conns.erase(id);
    }
    void RunAfterInLoop(const Functor& func,int delay)
    {
        _next_id++;
        _base_loop.TimerAdd(_next_id,delay,func);
    }
public:
    TcpServer(int port):_next_id(0),_enable_active_release(false),_acceptor(&_base_loop,port),_pool(&_base_loop)
    {
        _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));
        _acceptor.Listen(); //将套接字挂到loop中
    }
    void SetConnectCallback(const ConnectCallback& cb) { _connect_callback = cb; }
    void SetMessageCallback(const MessageCallback& cb) { _msg_callback = cb; }
    void SetClosedCallback(const ClosedCallback& cb) { _closed_callback = cb; }
    void SetEventCallback(const AnyEventCallback& cb) { _event_callback = cb; }
    //设置线程数量
    void SetThreadCount(int count) { _pool.SetThreadCount(count); }
    //设置非活跃超时销毁
    void SetActiveRelease(int timeout) { _timeout = timeout; _enable_active_release = true; }
    //添加一个任务
    void RunAfter(const Functor& func,int delay)
    {
        _base_loop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,func,delay));
    }
    void Start() { _pool.Create(); _base_loop.Start();}
};

void Channel::Remove(){ _loop->RemoveEvent(this);}
void Channel::Update(){ _loop->UpdateEvent(this);}

void TimeWheel::AddTask(uint64_t id,uint32_t delay,const OnCloseTime close_cb)
{
    _loop->RunInLoop(std::bind(&TimeWheel::AddTaskInLoop,this,id,delay,close_cb));
}
void TimeWheel::CancelTask(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimeWheel::CancelTaskInLoop,this,id));
}
void TimeWheel::RefreshTask(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimeWheel::RefreshTaskInLoop,this,id));
}


class NetWork
{
public: 
    NetWork()
    {
        DLOG("SIGPIPE INIT");
        signal(SIGPIPE,SIG_IGN);
    }
};
//防止信号导致的异常退出
static NetWork network;